package com.ikingtech.framework.sdk.job.scheduler;

import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author tie yan
 */
@Slf4j
public class JobScheduler<T extends Serializable> {

    private final ConcurrentSkipListMap<Long, List<ScheduledJob<T>>> executingJobMap = new ConcurrentSkipListMap<>();

    private final List<String> canceledJobs = new ArrayList<>();

    private JobReminder<T> reminder;

    private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(1, TimeUnit.SECONDS);

    ThreadPoolExecutor scheduleThreadPool = new ThreadPoolExecutor(
            // 核心线程数量
            4,
            // 最大线程数量
            100,
            // 空闲线程的最长存活时间
            60L,
            // 存活时间的单位
            TimeUnit.SECONDS,
            // 工作队列
            new LinkedBlockingQueue<>(3000),
            runnable -> new Thread(runnable, "job-schedule-" + runnable.hashCode())
    );

    public void init(List<ScheduledJob<T>> jobs, JobReminder<T> reminder) {
        this.reminder = reminder;
        jobs.forEach(this::schedule);
    }

    public void trick(ScheduledJob<T> job) {
        this.hashedWheelTimer.newTimeout(task -> this.execute(job), job.getExecuteTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    private void execute(ScheduledJob<T> job) {
        if (this.canceledJobs.contains(job.getSerialNo())) {
            this.canceledJobs.remove(job.getSerialNo());
            return;
        }
        this.scheduleThreadPool.execute(() -> this.reminder.remind(job));
    }

    public void schedule(ScheduledJob<T> job) {
        this.executingJobMap.computeIfAbsent(job.getExecuteTime(), k -> new ArrayList<>()).add(job);
        this.trick(job);
    }

    public void cancel(String serialNo) {
        this.canceledJobs.add(serialNo);
    }
}
